[SPARK-22618][CORE] Catch exception in removeRDD to stop jobs from dying#19836
[SPARK-22618][CORE] Catch exception in removeRDD to stop jobs from dying#19836brad-kaiser wants to merge 2 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Warning, not error, I'd imagine. rdd -> RDD. Why this construction with a partial function rather than write this inline below?
There was a problem hiding this comment.
Thanks for looking at my change. I changed the log to warning and "rdd" to "RDD". I had pulled out the partial function out because I felt like the expression was getting too deeply nested and hard to read. I certainly don't have to do that though.
889993f to
1152145
Compare
1152145 to
fbd2497
Compare
|
This looks reasonable, cc @cloud-fan |
| } | ||
|
|
||
| val futures = blockManagerInfo.values.map { bm => | ||
| bm.slaveEndpoint.ask[Int](removeMsg).recover(handleRemoveRddException) |
There was a problem hiding this comment.
personally I think
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD $rddId", e)
0 // zero blocks were removed
}
is more readable
|
ok to test |
|
LGTM |
jiangxb1987
left a comment
There was a problem hiding this comment.
LGTM only one minor issue.
|
|
||
| val futures = blockManagerInfo.values.map { bm => | ||
| bm.slaveEndpoint.ask[Int](removeMsg).recover { | ||
| case e: IOException => |
There was a problem hiding this comment.
According to what is described in the JIRA, should we only ignore the IOException if dynamic allocation is enabled?
There was a problem hiding this comment.
I think the logic for catching the error still applies even without dynamic allocation. If one of your nodes goes down while you happen to be in .unpersist, you wouldn't want your whole job to fail.
Dynamic allocation just makes this scenario more likely.
|
Test build #84553 has finished for PR 19836 at commit
|
|
Test build #84562 has finished for PR 19836 at commit
|
|
thanks, merging to master! |
…ing apache#19836 What changes were proposed in this pull request? I propose that BlockManagerMasterEndpoint.removeRdd() should catch and log any IOExceptions it receives. As it is now, the exception can bubble up to the main thread and kill user applications when called from RDD.unpersist(). I think this change is a better experience for the end user. I chose to catch the exception in BlockManagerMasterEndpoint.removeRdd() instead of RDD.unpersist() because this way the RDD.unpersist() blocking option will still work correctly. Otherwise, blocking will get short circuited by the first error. How was this patch tested? This patch was tested with a job that shows the job killing behavior mentioned above. @rxin, it looks like you originally wrote this method, I would appreciate it if you took a look. Thanks. This contribution is my original work and is licensed under the project's open source license.
What changes were proposed in this pull request?
I propose that BlockManagerMasterEndpoint.removeRdd() should catch and log any IOExceptions it receives. As it is now, the exception can bubble up to the main thread and kill user applications when called from RDD.unpersist(). I think this change is a better experience for the end user.
I chose to catch the exception in BlockManagerMasterEndpoint.removeRdd() instead of RDD.unpersist() because this way the RDD.unpersist() blocking option will still work correctly. Otherwise, blocking will get short circuited by the first error.
How was this patch tested?
This patch was tested with a job that shows the job killing behavior mentioned above.
@rxin, it looks like you originally wrote this method, I would appreciate it if you took a look. Thanks.
This contribution is my original work and is licensed under the project's open source license.